【新機能】Amazon DynamoDB Triggersを使ってDynamoDB StreamsとAWS Lambdaを連携する
こんにちは、せーのです。今日は個人的にも待ちに待ったアップデートが入ってきました。そう、DynamoDB Streams!!! この機能を使うとデータの入出力がそのままストリームとして後方に流れていき、トリガーとしてEC2やLambdaと連携することができるのです。 もうポーリングとかそういう煩わしい作業は入りません。値が登録されたことをDB自らが教えてくれるのです。すごい!かっこいい!!
、、、多少取り乱しました。では中身を見ていきましょう。
DynamoDB Streamsとは
今回のアップデートでDynamoDB Streamsを有効にするとDynamoDB内でのアイテムの変化(登録、更新、削除)が24時間ストリームとして時間順に流されます。このストリームには簡単なAPIでアクセスすることができ、その中身を他のデータストア、例えばRDSやElastiCache等に使用することが可能となります。 アクセスするにはKinesis Client Library(KCL)をEC2にインストールし、そこからAPIを叩きます。そう、DynamoDB Streamsの仕組みはKinesisに非常に似ているんですね。というかもうお手軽なKinesisと言ってもいいかもしれません。なのでKinesisのデータを受け取る仕組みを使ってDynamoDB Streamsも受け取ることができます。 その他にもDynamoDBのSDKを使用してもDynamoDB Streamsを受け取ることが出来るようになっています(preview時にはなかったと思います。これは便利!)。KCLはちょっとハードルが高い、という方はこちらを使用すると簡単にデータを取ることが出来ます。
またこのストリームが流れた事をトリガーとして(これをDynamoDB Triggersといいます)Lambdaを発火させることができます。VPC内に配置されていないElastiCacheやS3等にデータを流す、またデータの中身を使って次のアクションを決める(SNSで通知する等)時にもこちらの機能が使えますね。今回はこちらを試してみたいと思います。
やってみた
では早速やってみましょう。マネージメントコンソールからDynamoDBを開きます。
テーブルを作ります。今回はID、年齢、名前という組み合わせのデータを想定します。ハッシュキーにIDを指定します。
一応IDと年齢でインデックスも作っておきます。今回は使いませんが。
スループットを指定します。テストなので読み書きとも1ユニットにしておきます。本来はここがDynamoDBで一番悩むところです。画面上にある「プロビジョニングする必要のあるスループットキャパシティーを計算するにはどうすればよいですか?」のチェックをクリックするとスループットの計算画面に遷移しますのでこちらを上手に使ってスループットを計算して下さい。
次にキャパシティの警告アラームの有無を聞かれます。今回は使わないのでチェックを外します。 そしていよいよここでストリームの有効化が出てきます。「表示タイプ」というのは実際にストリームに流れるデータの内容を表しています。「キーのみ」「新イメージ(データ更新後の内容)」「新旧イメージ(データ更新前の内容と更新後の内容を両方表示する)」の中から選びます。今回は「新旧イメージ」を選択します。
確認画面を通してテーブルを作成します。テーブルが出来ると新たに下に「ストリーム」というタブが増えていますので選択してみると、先ほどの内容でストリームができています。簡単ですね。
次にこのストリームに連携させるLambdaを作っていきます。マネージメントコンソールからLambdaを開いて新規のFunctionを作成するボタンを押下するとblueprintに「dynamodb-process-stream」というblueprintがありますのでこちらを使いましょう。blueprintについてはこちらの記事を御覧ください。
イベントソースを選択する画面ではDynamoDBの先程作成したテーブルを選択します。「Batch size」というのはこの関数一回に処理するストリームの数を表します。1件ずつ処理するなら1を、最大で10000まで指定できます。複数のレコードをいっぺんに処理するならLambda Functionの中でループ処理させることになりますね。 「Starting Position」というのは読み取りを開始する位置を表します「Latest(最新のものから順番に読み取る)」と「Trim horizon(読み取りされていないものを古い順から読み取る)」の2種類から選択します。時間順にデータを処理するならTrim horizon、最新のデータを使用する設計ならLatestを選びましょう。
Functionの作成画面です。今回はblueprintそのままでいきたいと思います。ちなみにDynamoDB TriggersはNode.jsでもJavaでもどちらでも使えます。
Lambda Functionに対する権限をIAM Roleにて設定します。今回はDynamoDBを使うのでDynamoDBとCloudWatch Logsの操作権限を付けます。これもサンプルとしてDynamoDB Stream用のRoleが用意されているのでそちらを選択して新規に作成するだけでOKです。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "lambda:InvokeFunction" ], "Resource": [ "*" ] }, { "Effect": "Allow", "Action": [ "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:DescribeStream", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }
確認画面でイベントソースをいますぐ有効にするかを選択することができます。一旦テストしてから有効にする場合は「Enable later」、Functionの内容が問題ない場合は「Enbale now」を選びましょう。今回は「Enable later」を選びます。
これでLambda Functionができました。Testボタンを押してFunctionをテストしてみます。
サンプルとしてのテストデータを入力する画面が出てきます。こちらで「DynamoDB Update」を選択すると一般的なDynamoDBのINSERT,UPDATE,DELETEのサンプルデータが出てきます。
こちらを元に今回のテーブル内容に合うようにテストデータを改修していきます。リージョンを東京に、データをID,age,nameに増やします。
{ "Records": [ { "eventID": "1", "eventName": "INSERT", "eventVersion": "1.0", "eventSource": "aws:dynamodb", "awsRegion": "ap-northeast-1", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" }, "age": { "N": "25" }, "name": { "S": "Tsuyoshi Seino" } }, "SequenceNumber": "111", "SizeBytes": 26, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:account-id:table/testchao2suke/stream/2015-06-27T00:48:05.899" }, { "eventID": "2", "eventName": "MODIFY", "eventVersion": "1.0", "eventSource": "aws:dynamodb", "awsRegion": "ap-northeast-1", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" }, "age": { "N": "39" }, "name": { "S": "Tsuyoshi Seino" } }, "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" }, "age": { "N": "25" }, "name": { "S": "Tsuyoshi Seino" } }, "SequenceNumber": "222", "SizeBytes": 59, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:account-id:table/testchao2suke/stream/2015-06-27T00:48:05.899" }, { "eventID": "3", "eventName": "REMOVE", "eventVersion": "1.0", "eventSource": "aws:dynamodb", "awsRegion": "us-west-2", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "OldImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" }, "age": { "N": "25" }, "name": { "S": "Tsuyoshi Seino" } }, "SequenceNumber": "333", "SizeBytes": 38, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:account-id:table/testchao2suke/stream/2015-06-27T00:48:05.899" } ] }
こちらでSubmitを押すとこのデータをLambda Functionに流してくれます。結果ログが下に出てきますので確認しましょう。
テストが問題なかったのでLambda Functionを有効化します。Lambda FunctionのStateをクリックします。
確認画面が出てくるのでDynamoDB Streamsがあっているかどうか確認して「Enable」ボタンを押します。これでDynamoDB StreamsとLambda Functionがつながりました。
ちなみにDynamoDB StreamsとLambda Functionの連携はDynamoDB側からもできます。DynamoDB Streamsの対象ストリームを選択し「Lambda関数を関連付ける」をクリックします。
すると先程Lambda Functionの作成時に最初に設定した読み取りの開始点とバッチサイズの指定ができます。この時点でLambda Functionを作っていなければ文章内のリンクからLambdaのコンソールに飛べますのでそちらから設定して下さい。DynamoDB Streamsの方は日本語化されていてLambdaの方は英語のままなので多少混乱しますね。
さて、どちらかの方法を使ってDynamoDB StreamsとLambda Functionを連携させたら実際にDynamoDBにデータを入れてみましょう。DynamoDBのテーブルを開いて「アイテムの作成」をクリックします。
データをJSON形式で作り「保存」をクリックします。
データが増えたことを確認します。これで既にDynamoDB Streamsにデータが流れ、Lambdaが発火しているはずです。
Lambdaが正常に発火したかどうかログを見て確認しましょう。CloudWatch Logsを開きます。マネージメントコンソールからCloudWatchにいき、「ログ」から先程作ったLambda Functionのログを開きます。
INSERTというタイプで先ほどのデータがLambdaにわたっていることが確認できます。
次にデータを更新してみます。DynamoDBに戻って先ほどのデータの年齢と名前を変えてみます。
再びCloudWatch Logsを見てみます。NewImageの所に変更後のデータが、OldImageのところには変更前のデータがあることが確認できます。この差分を使って例えばリアルタイムに更新されるデータポイントをグラフ化したりするわけです。
まとめ
いかがでしたでしょうか。やってみると簡単なことがわかるかと思います。こんなに簡単にデータドリブンのシステムが、しかもサーバーレスで作れる、というのは本当にいい時代になりました! エンジニアとしてベテランの方の中にはNo SQLのデータストアであるDynamoDBに抵抗のあった方も結構いるかと思います。私も最初は抵抗があったので。でも使ってみるとレスポンスの速さ等は特筆すべきところがあり、慣れればクエリやスキャンも自由にできるようになります。 これを機会にDynamoDBを触ってみて、サーバーレスなシステムを構築してみてはいかがでしょうか?
参考リンク
- https://aws.amazon.com/dynamodb/faqs/#triggers
- http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html
- https://aws.amazon.com/blogs/aws/dynamodb-update-triggers-streams-lambda-cross-region-replication-app/
- http://docs.aws.amazon.com/amazondynamodb/latest/developerguide//Streams.html